Iceberg 1.11 support for Spark 411, part (1/3): extract version-divergent scan APIs behind a shim#14881
Conversation
gerashegalov
left a comment
There was a problem hiding this comment.
This should ideally build on top of #14866
|
build |
Greptile SummaryThis PR is a preparatory refactor for Iceberg 1.11.x support (no behavior change for 1.6.x/1.9.x/1.10.x). It hides API surface that diverges between Iceberg versions behind a small shim interface, and updates reflective field/method access in
Confidence Score: 5/5Safe to merge — no behavior change for existing Iceberg versions; the refactor correctly preserves all existing functionality while laying groundwork for 1.11.x. All three per-version GpuSparkCopyOnWriteScan implementations are structurally identical to the old common class, the new factory dispatch is a straightforward delegation, and the reflective fallback logic in GpuSparkScanAccess is clearly reasoned with both candidate names listed and well-commented. The two observations flagged are minor diagnostic ergonomics and a theoretical field-priority edge case that doesn't affect any current caller. GpuSparkScanAccess.java — the invokeMethod exception-wrapping tweak and the field-search priority comment are worth a look before the 1.11.x follow-up lands. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A["GpuSparkScan.tryConvert(cpuScan)"] --> B{isCopyOnWriteScan?}
B -- Yes --> C["ShimUtils.newCopyOnWriteScan()"]
B -- No --> D{isBatchQueryScan?}
D -- Yes --> E["new GpuSparkBatchQueryScan"]
D -- No --> F["IllegalArgumentException"]
C --> G["IcebergShimUtils.newCopyOnWriteScan() [interface]"]
G --> H1["iceberg-1-6-x ShimUtilsImpl"]
G --> H2["iceberg-1-9-x ShimUtilsImpl"]
G --> H3["iceberg-1-10-x ShimUtilsImpl"]
G --> H4["iceberg-1-11-x ShimUtilsImpl (future)"]
H1 --> I1["GpuSparkCopyOnWriteScan 1.6.x\nextends GpuSparkCopyOnWriteScanBase\nwith SupportsRuntimeFiltering / filter(Filter[])"]
H2 --> I2["GpuSparkCopyOnWriteScan 1.9.x\nextends GpuSparkCopyOnWriteScanBase\nwith SupportsRuntimeFiltering / filter(Filter[])"]
H3 --> I3["GpuSparkCopyOnWriteScan 1.10.x\nextends GpuSparkCopyOnWriteScanBase\nwith SupportsRuntimeFiltering / filter(Filter[])"]
H4 --> I4["GpuSparkCopyOnWriteScan 1.11.x\nextends GpuSparkCopyOnWriteScanBase\nwith SupportsRuntimeV2Filtering / filter(Predicate[])"]
subgraph common["iceberg/common (version-agnostic)"]
A
B
C
D
E
F
BASE["GpuSparkCopyOnWriteScanBase (abstract)\nestimateStatistics / equals / hashCode / toString"]
end
I1 & I2 & I3 & I4 --> BASE
Reviews (2): Last reviewed commit: "Iceberg: prepare scan/bridge layer for 1..." | Re-trigger Greptile |
4647fc3 to
ac790ba
Compare
This PR has no behavior change for the Iceberg versions currently shipped (1.6.x / 1.9.x / 1.10.x). It makes two refactors that are required by the upcoming iceberg-1-11-x module: 1) GpuSparkScanAccess version-tolerant via reflection. NVIDIA#14866 introduced GpuSparkScanAccess as a root-loadable bridge to Iceberg's package-private scan classes. Three of its methods called SparkScan.branch() / expectedSchema() / filterExpressions(), which Iceberg 1.11 removed/renamed (the latter two became projection() and filters() respectively). One field read used "runtimeFilterExpressions", which 1.11 renamed to "runtimeFilters" on the new SparkRuntimeFilterableScan parent class. Switching to reflection with priority-ordered candidate names lets the same common-code bridge compile and run against any Iceberg version. readField was extended to accept varargs field names; a new invokeMethod helper does the same for protected methods. 2) Per-Iceberg-version GpuSparkCopyOnWriteScan subclass. NVIDIA#14866's GpuSparkCopyOnWriteScan in common hardcodes SupportsRuntimeFiltering + filter(Filter[]). Iceberg 1.11 switches SparkCopyOnWriteScan to SupportsRuntimeV2Filtering + filter(Predicate[]), so the hardcode prevents 1.11 from honoring runtime filtering. GpuSparkCopyOnWriteScan -> renamed to abstract GpuSparkCopyOnWriteScanBase. The runtime-filter trait and filter() method live on a per-Iceberg-version concrete subclass shipped from each iceberg-1-N-x module. 1.6 / 1.9 / 1.10 mix in SupportsRuntimeFiltering with filter(Filter[]) (zero behavior change). 1.11 will mix in SupportsRuntimeV2Filtering + filter(Predicate[]) in a follow-up PR. A new ShimUtils.newCopyOnWriteScan(Scan, RapidsConf, boolean) factory routes GpuSparkScan.tryConvert through the per-version ShimUtilsImpl so common code does not need to know which subclass to construct. Verified by building buildver=350 (iceberg-1-6-x), buildver=356 (iceberg-1-9-x + iceberg-1-10-x co-shipped), and the full reactor through this commit. Signed-off-by: Chong Gao <res_life@163.com>
ac790ba to
a30d588
Compare
|
build |
| * Iceberg 1.10.x copy-on-write scan: {@code SupportsRuntimeFiltering} with | ||
| * {@code filter(Array[Filter])}. | ||
| */ | ||
| class GpuSparkCopyOnWriteScan( |
There was a problem hiding this comment.
Non-blocking concision: the 1.6/1.9/1.10 implementations are identical apart from the version in the Scaladoc, and this class only depends on public Scan + SupportsRuntimeFiltering. Could the V1 path live once in common, e.g. GpuSparkCopyOnWriteV1Scan, with all three ShimUtilsImpls instantiating it? Then only the future 1.11 V2 path needs a version-specific class.
| return sparkScan(scan).branch(); | ||
| // Iceberg 1.10.x and earlier: protected method SparkScan.branch(). Iceberg 1.11.x | ||
| // removed it entirely; return null for display purposes. | ||
| return invokeMethod(sparkScan(scan), String.class, "branch"); |
There was a problem hiding this comment.
Non-blocking: for the 1.11 follow-up, this will render branch=null for branch reads because Iceberg 1.11 removed SparkScan.branch() but SparkBatchQueryScan/SparkCopyOnWriteScan still carry a private branch field and include it in description(). Should this accessor fall back to reading the branch field, or should the GPU scan toString use cpuScan.description() as the PR description says?
Stacked work for #14853 (1/3) — common-code preparation for adding iceberg-1-11-x.
Depends on
Description
Refactors
iceberg/commonso theSparkScan/SparkCopyOnWriteScan/SparkBatch/DataWriteResultAPIs that diverge between Iceberg 1.10.x and 1.11.x are hidden behind a small interface, with per-Iceberg-version implementations iniceberg-1-6-x/iceberg-1-9-x/iceberg-1-10-x. No behavior change for the Iceberg versions this PR ships; sets the stage for the follow-up PR that addsiceberg-1-11-x.Common changes:
GpuSparkCopyOnWriteScan→ renamed toGpuSparkCopyOnWriteScanBase(abstract). The runtime-filter trait +filtermethod live in a per-version concrete subclass (1.6/1.9/1.10 mix inSupportsRuntimeFilteringwithfilter(Filter[]); 1.11 will mix inSupportsRuntimeV2Filteringwithfilter(Predicate[])).GpuSparkScan: rewritehasNestedTypevia Spark'sreadSchema()+ Spark types so it no longer depends on the 1.10-onlycpuScan.expectedSchema(). DispatchSparkCopyOnWriteScanconstruction through the newShimUtils.newCopyOnWriteScanfactory.GpuSparkBatchQueryScan.toStringusescpuScan.description()(available in both 1.10 and 1.11) instead ofbranch/expectedSchema/filterExpressions(1.11 removed these).GpuSparkBatchQueryScan.runtimeFilterExpressionsreflective field-read tolerates both the 1.10 name (runtimeFilterExpressions) and the 1.11 name (runtimeFilters).GpuSparkBatch: same tolerance forexpectedSchema(1.10) vsprojection(1.11).GpuSparkWrite: type-annotatenew Array[DataFile](0)so Scala 2.13 doesn't inferArray[Nothing]under 1.11's wildcardedDataWriteResult.dataFiles().IcebergShimUtils/ShimUtils: addnewCopyOnWriteScan(Scan, RapidsConf, Boolean): GpuScanfactory. The parameter is Spark's publicScanbecause Iceberg'sSparkCopyOnWriteScanis package-private — cross-package callers cannot reference it directly.Per-Iceberg-version module changes (1.6 / 1.9 / 1.10, all identical for the V1 path):
GpuSparkCopyOnWriteScaninorg.apache.iceberg.spark.source(so it can reference the package-privateSparkCopyOnWriteScan). Companion object exposescreate(Scan, ...): GpuScanfor cross-package callers.ShimUtilsImpl.javaimplementsnewCopyOnWriteScanviaGpuSparkCopyOnWriteScan.create.The two
try/catchfield-name fallbacks (inGpuSparkBatchQueryScanandGpuSparkBatch) are tactical and will be pushed behind proper per-versionIcebergShimUtilsmethods in a later cleanup PR.Checklists
Documentation
Testing
(3.5.x + 4.0.x iceberg integration tests in `integration_tests/src/main/python/iceberg/` — exercises the new dispatch path with no behavior change vs. before this PR.)
Performance